Skip to main content

Real-time Inference

In this section, we design the real-time inference using the Seshat SDK. Assume that we have the recommendation feature view discussed in this section. Make sure you understand it.

Feature View Online Mode

Every feature view has online and offline modes. For inference, we need to set online to true. So we use the recommendation feature view we already defined and add online_source and the online_pipeline.

Responsibility & Overview

We want to create a feature view that, with a target input address, can fetch from the database similar addresses and features for each address and token. The result should look like this:

addresstokenaddress_sent_countaddress_received_counttoken_sender_counttoken_receiver_count
address_1token102030001000

This is an example of the data we should get. In this dataset, we know which tokens each user interacts with and some features about transactions of that address with each token, like address_sent_count, address_received_token, and some features that are just related to the token, like token_sender_count and token_receiver_token.

Define Source

First, we should define the source that fetches the data. In our feature view, we have the saver that writes the data into the SQL database, so here we should use the source that fetches the data from the SQLDB source.

Now consider the query for the database. This query fetches the top 20 similar addresses to the address. This query should be something like this:

SELECT DISTINCT CASE
WHEN address_1 = 'target_address' THEN address_2
ELSE address_1
END AS address,
cosine
FROM cosine_sim
WHERE address_1 = 'target_address'
OR address_2 = 'target_address'
ORDER BY cosine DESC
LIMIT 20;

But this query should know the target_address, so we can do this:

def get_similar(target_address, limit, *args, **kwargs):
return f"""
SELECT DISTINCT CASE
WHEN address_1 = '{target_address}' THEN address_2
ELSE address_1
END AS address,
cosine
FROM cosine_sim
WHERE address_1 = '{target_address}'
OR address_2 = '{target_address}'
ORDER BY cosine DESC
LIMIT {limit};
"""

Now the query function is ready, but there is another thing: how to integrate it with the feature view and source?

As you may know, the feature view accepts the *args and **kwargs and passes them into all callable attributes, like splitter, transformer, and source.

So if you want to pass the limit and target_address to the feature view in the call, you can be sure that these arguments will be passed to the get_similar function.

So we add an online source attribute for the feature view:

class TokenRecommendation(FeatureView):
online = False

online_source = SQLDBSource(
url=DB_URL, table_name="cosine_sim", query_fn=get_similar
)

And the calling of the view passes the limit and target_address to it.

target_address = "some_address"
limit = 10

view = TokenRecommendation()
view(limit=limit, target_address=target_address)

Online Pipeline

Now it is time to design the online pipeline for our purpose. We use the FromSQLDBDeriver to add other columns to the fetched sf.

In the query of these sections, we need to query using the IN clause. This needs to format the list of items into the proper string that SQL can understand. So we need a function for it. The below function works fine:

def join_as_string(values: Iterable[str], delimiter: str = ","):
return delimiter.join(map(lambda addr: f"'{addr}'", values))
  • address-token: We first want to add the tokens that the user has interacted with, along with sent_count and received_count features.

    The query needs to know the similar addresses. We know that get_query_fn of the FromSQLDBDeriver passes the default sf to this function, so no hard work is needed to define the query:

    def top_token_query(default, *args, **kwargs):
    TOP_TOKEN_QUERY = """
    SELECT distinct *
    FROM top_token
    WHERE address IN ({joined_address_str})
    """

    if len(default) == 0:
    return ""
    joined_address_str = join_as_string(default["address"].tolist())
    return TOP_TOKEN_QUERY.format(joined_address_str=joined_address_str)

    The query is ready, and just one step remains: creating a deriver that uses this source.

    FromSQLDBDeriver(
    merge_how="right",
    base_col="address",
    source=SQLDBSource(url=DB_URL, table_name="address-token"),
    get_query_fn=top_token_query,
    )
  • tokens info: Another data that we need is the features of the tokens. These features are the unique sender & receiver counts.

    The query should be something like this:

    def token_info_query(default, *args, **kwargs):
    if len(default) == 0:
    return ""

    joined_address_str = join_as_string(default["token"])
    return f"""
    SELECT * FROM token_info WHERE address IN ({joined_address_str})
    """

    Now we define the deriver for this:

    FromSQLDBDeriver(
    base_col="token",
    source=SQLDBSource(
    url=DB_URL,
    table_name="tokens",
    ),
    get_query_fn=token_info_query,
    )

Finally, we can create our online pipeline by adding the above transformer into the pipeline.

online_pipeline = Pipeline(
pipes=[
FromSQLDBDeriver(
merge_how="right",
base_col="address",
source=SQLDBSource(url=DB_URL, table_name="address-token"),
get_query_fn=top_token_query,
),
FromSQLDBDeriver(
base_col="token",
source=SQLDBSource(
url=DB_URL,
table_name="tokens",
),
get_query_fn=token_info_query,
),
]
)

Calling the Feature View

Now everything is ready, and we just need to place the pipeline and the source into the right place in the feature view:

class TokenRecommendation(FeatureView):
online = True

online_source = SQLDBSource(
url=DB_URL, table_name="cosine_sim", query_fn=get_similar
)
online_pipeline = online_pipeline
view = TokenRecommendation()
view(limit=limit, target_address=target_address)

The result of inference can be accessed by the data:

data = view.data